-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(source): add NATS source consumer parameters #17615
feat(source): add NATS source consumer parameters #17615
Conversation
Hi. Can you help to rebase the commits first? |
e3ce905
to
6f3f8f9
Compare
e044338
to
90a675b
Compare
durable_name, | ||
description, | ||
ack_wait: ack_wait.unwrap_or_default(), | ||
max_deliver: max_deliver.unwrap_or_default(), | ||
filter_subject: filter_subject.unwrap_or_default(), | ||
filter_subjects: filter_subjects.unwrap_or_default(), | ||
rate_limit: rate_limit.unwrap_or_default(), | ||
sample_frequency: sample_frequency.unwrap_or_default(), | ||
max_waiting: max_waiting.unwrap_or_default(), | ||
max_ack_pending: max_ack_pending.unwrap_or_default(), | ||
// idle_heartbeat: idle_heart.unwrap_or_default(), | ||
max_batch: max_batch.unwrap_or_default(), | ||
max_bytes: max_bytes.unwrap_or_default(), | ||
max_expires: max_expires.unwrap_or_default(), | ||
inactive_threshold: inactive_threshold.unwrap_or_default(), | ||
memory_storage: memory_storage.unwrap_or_default(), | ||
backoff: backoff.unwrap_or_default(), | ||
num_replicas: num_replicas.unwrap_or_default(), | ||
..Default::default() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These .unwrap_or_default()
s have subtly different semantics from ..Default::default()
, although in current fact they behave the same because Config
is just deriving Default
but not manually implementing it.
I'm OK with this but we should be aware of the difference.
sample_frequency: Option<u8>, | ||
max_waiting: Option<i64>, | ||
max_ack_pending: Option<i64>, | ||
_idle_heartbeat: Option<Duration>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this deprecated? May just remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well the behavior of the missed idle heartbeat is a bit cryptic with the ‘async_nats’ crate and is a relatively important parameter user because it translates into error messages sent by the stream in the form of “missed idle heartbeat”, which can cause pipelines to fail. Not sure how to handle this one tbh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed from the configuration, since it doesn't exist in the pull consumer config: https://docs.rs/async-nats/latest/async_nats/jetstream/consumer/pull/struct.Config.html
Suggest following the practice at #11203 |
properties.ack_wait.clone().map(|s| { | ||
Duration::from_secs(s.parse::<u64>().expect("failed to parse ack_wait to u64")) | ||
}), | ||
properties.max_deliver.clone().map(|s| { | ||
s.parse::<i64>() | ||
.expect("failed to parse max_deliver to i64") | ||
}), | ||
properties.filter_subject.clone(), | ||
properties | ||
.filter_subjects | ||
.clone() | ||
.map(|s| s.split(',').map(|s| s.to_string()).collect()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't look good. We'd better use typed properties, and parse the config when creating the properties struct.
So that CREATE SOURCE
will reject invalid config, instead of failing at runtime.
e.g., using #[serde_as(as = "Option<DisplayFromStr>")]
or #[serde(deserialize_with = ...)]
like other connectors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah indeed that seems much better, even without understanding the error related to config parsing that parsing seemed a bit sketchy. I’ll give it a try in the upcoming days @xxchan thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved the consumer specific parameters into a separate struct, and added some typed properties based on @xxchan's suggestion
Just added a commit to address the comments from previous reviewers! Sorry for coming back late on this 🙏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, can approve if others don't have further comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest LGTM
src/connector/src/source/nats/mod.rs
Outdated
pub max_bytes: Option<i64>, | ||
|
||
#[serde(rename = "consumer.max_expires")] | ||
#[serde_as(as = "Option<DurationSeconds<String>>")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
#[serde(rename = "consumer.ack_policy")] | ||
#[serde_as(as = "Option<DisplayFromStr>")] | ||
pub ack_policy: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can try to deser it into Enum directly, serde can help us reject unrecognized str.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to do this, but unfortunately couldn't find a clean way to do it -- the enums from the NATS crate do not implement FromStr
, so it doesn’t seem possible to deserialize directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nevermind, I will try refactor it later. Thanks for your contribution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
220fded
Co-authored-by: benjamin-awd <benjamindornel@gmail.com> (cherry picked from commit 220fded)
Co-authored-by: benjamin-awd <benjamindornel@gmail.com>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This PR adds the 20 missing parameters supported in the async_nats crate of the consumer created in NATS sources. The DDL of a NATS source would now look like:
All parameters are optional. If left empty, it falls back to the
async_nats
default values. All are eventually parsed as strings in RisingWave and casted to the appropriate types before being passed to thebuild_consumer
method.Checklist
./risedev check
(or alias,./risedev c
)Release note
Now, all the parameters supported by the
async_nats
crate are supported in the RisingWave NATS source connector. In particular, users can now configure few important aspects of consumers such as the number of replicas that previously would default to 1, hereby causing Jetstream issues of synchronization across replicas. It also allows setting parameters such asmax_waiting
ormax_ack_pending
that can be critical for performance optimization of NATS clusters.No backward incompatibility would be introduced by these changes.